-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement SinglePartitionSubscriber. #22
Conversation
This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.
sized_message = self._messages_by_offset[offset] | ||
try: | ||
self._nack_handler.on_nack(sized_message.message, | ||
lambda: self._queue.put(requests.AckRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to use AckRequests? It seems simpler to just call _handle_ack() here. What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_handle_ack is a coroutine. the nack handler function is a Callable[[PubsubMessage], None] to enable it being called from other threads that are not part of the event loop. It would be harder to call _handle_ack from here than it would be to just put an AckRequest on the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
self.fail(e) | ||
|
||
async def _handle_queue_message(self, message: Union[ | ||
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the queue really have all these kinds of messages? Do we ever want them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can. This is the protocol I backed out from here https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/message.py
Technically drop and modack requests can be sent from the message, but as you can see on the line below we immediately fail the client if those requests are sent.
else: | ||
self._handle_nack(message) | ||
|
||
async def _looper(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use asyncio's default event_loop for asynchronicity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does use asyncio's default event loop? This is just a coroutine running in it that polls for a queue.Queue to have a message. There is asyncio.Queue (used elsewhere) which you can await on, but Message.ack/nack (from CPS' client library) needs to be able to be called from other threads, so it uses the threadsafe queue instead of the asyncio-enabled one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -0,0 +1,11 @@ | |||
from typing import NamedTuple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meta comment; do you mind setting your editor's line length limit to 100 lines? The line breaks in your PRs look quite unreadable; this is fine, but I would appreciate it for the next ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with tianzi, there's an auto-formatter that exists. I'll run that after this chain of prs is submitted since it would be quite difficult to fix the commit chain history :/ I hope thats acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, totally fine. Thanks.
else: | ||
self._handle_nack(message) | ||
|
||
async def _looper(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?
sized_message = self._messages_by_offset[offset] | ||
try: | ||
self._nack_handler.on_nack(sized_message.message, | ||
lambda: self._queue.put(requests.AckRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.
@@ -0,0 +1,11 @@ | |||
from typing import NamedTuple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, totally fine. Thanks.
This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.